In [1]:
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Use a Python 3 kernel (not PySpark) to allow you to configure the SparkSession in the notebook and include the spark-bigquery-connector required to use the BigQuery Storage API.
In [2]:
!scala -version
In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName('Spark DataFrames & Pandas Plotting')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar') \
.getOrCreate()
In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
In [5]:
table = "bigquery-public-data.wikipedia.pageviews_2020"
df_wiki_pageviews = spark.read \
.format("bigquery") \
.option("table", table) \
.option("filter", "datehour >= '2020-03-01' AND datehour < '2020-03-02'") \
.load()
df_wiki_pageviews.printSchema()
Select required columns and apply a filter using where()
which is an alias for filter()
then cache the table
In [6]:
df_wiki_en = df_wiki_pageviews \
.select("datehour", "wiki", "views") \
.where("views > 1000 AND wiki in ('en', 'en.m')") \
.cache()
df_wiki_en
Out[6]:
Group by title and order by page views to see the top pages
In [7]:
import pyspark.sql.functions as F
df_datehour_totals = df_wiki_en \
.groupBy("datehour") \
.agg(F.sum('views').alias('total_views'))
df_datehour_totals.orderBy('total_views', ascending=False)
Out[7]:
In [8]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
%time pandas_datehour_totals = df_datehour_totals.toPandas()
pandas_datehour_totals.set_index('datehour', inplace=True)
pandas_datehour_totals.head()
Out[8]:
In [9]:
import matplotlib.pyplot as plt
Use the Pandas plot function to create a line chart
In [10]:
pandas_datehour_totals.plot(kind='line',figsize=(12,6));
In [11]:
import pyspark.sql.functions as F
df_wiki_totals = df_wiki_en \
.groupBy("datehour") \
.pivot("wiki") \
.agg(F.sum('views').alias('total_views'))
df_wiki_totals
Out[11]:
In [12]:
pandas_wiki_totals = df_wiki_totals.toPandas()
pandas_wiki_totals.set_index('datehour', inplace=True)
pandas_wiki_totals.head()
Out[12]:
In [13]:
pandas_wiki_totals.plot(kind='line',figsize=(12,6))
Out[13]:
In [14]:
pandas_wiki_totals.plot.area(figsize=(12,6))
Out[14]: